RocketRegressor (sktime-style) — ROCKET features + RidgeCV

RocketRegressor (sktime-style) — ROCKET features + RidgeCV#

ROCKET (RandOm Convolutional KErnel Transform) turns each time series window into a large feature vector by applying many random 1D convolutions and extracting two summary stats per kernel:

  • \(\max\) of the convolution output

  • PPV = proportion of positive values

Then a fast linear model (often RidgeCV) is fit on those features.

This notebook implements a runnable reference RocketRegressor(num_kernels=..., ...) inspired by sktime’s wrapper.

Why ROCKET works (intuition)#

Random convolutions act like a large, diverse set of pattern detectors:

  • short/long kernels react to different scales

  • dilation creates multi-scale receptive fields

  • max captures presence of a pattern; PPV captures how often it’s present

With enough random kernels, a linear model can fit surprisingly rich functions.

import numpy as np
import pandas as pd

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import os
import plotly.io as pio

from sklearn.linear_model import RidgeCV, Ridge
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

from scipy import stats

pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
pio.templates.default = "plotly_white"

rng = np.random.default_rng(7)

import numpy, pandas, sklearn, scipy, plotly
print("numpy:", numpy.__version__)
print("pandas:", pandas.__version__)
print("sklearn:", sklearn.__version__)
print("scipy:", scipy.__version__)
print("plotly:", plotly.__version__)
numpy: 1.26.2
pandas: 2.1.3
sklearn: 1.6.0
scipy: 1.15.0
plotly: 6.5.2
def _as_3d_panel(X: np.ndarray) -> np.ndarray:
    """Accept (n, m) or (n, d, m). Return (n, d, m)."""
    X = np.asarray(X, dtype=float)
    if X.ndim == 2:
        return X[:, None, :]
    if X.ndim == 3:
        return X
    raise ValueError(f"X must be 2D or 3D, got shape={X.shape}")


def _acf(x: np.ndarray, max_lag: int) -> tuple[np.ndarray, np.ndarray]:
    x = np.asarray(x, dtype=float)
    x = x - x.mean()
    denom = float(np.dot(x, x))
    lags = np.arange(max_lag + 1)
    values = np.zeros(max_lag + 1)
    values[0] = 1.0
    if denom == 0.0:
        return lags, values
    for k in range(1, max_lag + 1):
        values[k] = float(np.dot(x[k:], x[:-k]) / denom)
    return lags, values


def make_sliding_windows(y: np.ndarray, window_length: int) -> tuple[np.ndarray, np.ndarray]:
    y = np.asarray(y, dtype=float)
    L = int(window_length)
    if y.size <= L:
        raise ValueError("y is too short")
    X = np.column_stack([y[i : y.size - L + i] for i in range(L)])
    y_next = y[L:]
    return X, y_next
class RocketTransformer:
    """ROCKET feature map: random dilated convolutions -> [max, ppv] features per kernel."""

    def __init__(
        self,
        *,
        num_kernels: int = 500,
        kernel_sizes: tuple[int, ...] = (7, 9, 11),
        random_state: int | None = None,
    ):
        self.num_kernels = int(num_kernels)
        self.kernel_sizes = tuple(int(k) for k in kernel_sizes)
        self.random_state = random_state

        self.kernels_: list[dict] | None = None
        self.n_timepoints_: int | None = None
        self.n_dims_: int | None = None

    def fit(self, X, y=None):
        X3 = _as_3d_panel(X)
        n, d, m = X3.shape
        self.n_timepoints_ = int(m)
        self.n_dims_ = int(d)

        r = np.random.default_rng(self.random_state)
        kernels: list[dict] = []

        for _ in range(self.num_kernels):
            k = int(r.choice(self.kernel_sizes))
            dim = int(r.integers(0, d))

            # Dilation: choose from powers of two up to what fits
            if m <= k:
                dilation = 1
            else:
                max_d = (m - 1) // (k - 1)
                max_pow = int(np.floor(np.log2(max_d))) if max_d > 0 else 0
                dilation = int(2 ** r.integers(0, max_pow + 1))

            padding = 0
            if bool(r.integers(0, 2)):
                padding = int(((k - 1) * dilation) // 2)

            weights = r.normal(0.0, 1.0, size=k)
            weights = weights - weights.mean()
            bias = float(r.uniform(-1.0, 1.0))

            kernels.append(
                {
                    "length": k,
                    "weights": weights.astype(float),
                    "bias": bias,
                    "dilation": dilation,
                    "padding": padding,
                    "dim": dim,
                }
            )

        self.kernels_ = kernels
        return self

    def transform(self, X) -> np.ndarray:
        if self.kernels_ is None or self.n_timepoints_ is None or self.n_dims_ is None:
            raise RuntimeError("Call fit() before transform().")

        X3 = _as_3d_panel(X)
        n, d, m = X3.shape
        if m != self.n_timepoints_ or d != self.n_dims_:
            raise ValueError(
                f"X must have shape (n,{self.n_dims_},{self.n_timepoints_}) (or (n,{self.n_timepoints_})), got {X3.shape}"
            )

        features = np.empty((n, 2 * len(self.kernels_)), dtype=float)

        for i, kinfo in enumerate(self.kernels_):
            k = int(kinfo["length"])
            w = kinfo["weights"]
            b = float(kinfo["bias"])
            dilation = int(kinfo["dilation"])
            padding = int(kinfo["padding"])
            dim = int(kinfo["dim"])

            x = X3[:, dim, :]
            if padding > 0:
                x = np.pad(x, ((0, 0), (padding, padding)), mode="constant")

            m_pad = x.shape[1]
            out_len = m_pad - dilation * (k - 1)
            if out_len <= 0:
                # Degenerate: no valid positions, fall back to zeros
                max_val = np.zeros(n)
                ppv = np.zeros(n)
            else:
                starts = np.arange(out_len)[:, None]
                idx = starts + dilation * np.arange(k)[None, :]
                seg = x[:, idx]  # (n, out_len, k)
                conv = np.tensordot(seg, w, axes=([2], [0])) + b  # (n, out_len)
                max_val = conv.max(axis=1)
                ppv = (conv > 0).mean(axis=1)

            features[:, 2 * i] = max_val
            features[:, 2 * i + 1] = ppv

        return features

    def fit_transform(self, X, y=None) -> np.ndarray:
        return self.fit(X, y=y).transform(X)
class RocketRegressor:
    """ROCKET + RidgeCV regressor (sktime-style wrapper)."""

    def __init__(
        self,
        *,
        num_kernels: int = 500,
        kernel_sizes: tuple[int, ...] = (7, 9, 11),
        alphas: np.ndarray | None = None,
        random_state: int | None = None,
    ):
        self.num_kernels = int(num_kernels)
        self.kernel_sizes = tuple(int(k) for k in kernel_sizes)
        self.alphas = np.asarray(alphas, dtype=float) if alphas is not None else np.logspace(-3, 3, 13)
        self.random_state = random_state

        self.transformer_ = RocketTransformer(
            num_kernels=self.num_kernels,
            kernel_sizes=self.kernel_sizes,
            random_state=self.random_state,
        )
        self.regressor_ = RidgeCV(alphas=self.alphas)

    def fit(self, X, y):
        y = np.asarray(y, dtype=float)
        Phi = self.transformer_.fit_transform(X)
        self.regressor_.fit(Phi, y)
        return self

    def predict(self, X) -> np.ndarray:
        Phi = self.transformer_.transform(X)
        return self.regressor_.predict(Phi)

Demo: one-step and recursive forecasts via sliding windows#

We generate a multi-seasonal series, build a sliding-window regression dataset, and compare:

  • RocketRegressor

  • a baseline Ridge on raw lag features

def simulate_ar1_noise(n: int, *, phi: float, sigma: float, rng: np.random.Generator) -> np.ndarray:
    eps = rng.normal(0.0, sigma, size=n)
    u = np.zeros(n)
    for t in range(1, n):
        u[t] = phi * u[t - 1] + eps[t]
    return u


n = 650
idx = pd.date_range("2021-01-01", periods=n, freq="D")
t = np.arange(n)

weekly = 2.0 * np.sin(2 * np.pi * t / 7) + 0.6 * np.cos(2 * np.pi * t / 7)
monthly = 1.2 * np.sin(2 * np.pi * t / 30) - 0.4 * np.cos(2 * np.pi * t / 30)
trend = 0.015 * t
noise = simulate_ar1_noise(n, phi=0.7, sigma=0.7, rng=rng)

y = 40.0 + trend + weekly + monthly + noise
y = pd.Series(y, index=idx, name="y")

fig = go.Figure()
fig.add_trace(go.Scatter(x=y.index, y=y, name="y", line=dict(color="black")))
fig.update_layout(title="Synthetic multi-seasonal series", xaxis_title="date", yaxis_title="value")
fig.show()
L = 60
X, y_next = make_sliding_windows(y.to_numpy(), window_length=L)

h = 120
X_train, y_train = X[:-h], y_next[:-h]
X_test, y_test = X[-h:], y_next[-h:]
t_test = y.index[-h:]

rocket = RocketRegressor(num_kernels=600, random_state=7)
rocket.fit(X_train, y_train)
pred_rocket = rocket.predict(X_test)

ridge = Ridge(alpha=1.0)
ridge.fit(X_train, y_train)
pred_ridge = ridge.predict(X_test)

def summarize(name: str, y_true: np.ndarray, y_pred: np.ndarray) -> None:
    mae = mean_absolute_error(y_true, y_pred)
    rmse = mean_squared_error(y_true, y_pred, squared=False)
    r2 = r2_score(y_true, y_pred)
    print(f"{name}: MAE={mae:.3f} RMSE={rmse:.3f} R^2={r2:.3f}")


summarize("ROCKET+RidgeCV", y_test, pred_rocket)
summarize("Ridge on lags", y_test, pred_ridge)
print("Chosen alpha (ROCKET RidgeCV):", float(rocket.regressor_.alpha_))
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[6], line 24
     20     r2 = r2_score(y_true, y_pred)
     21     print(f"{name}: MAE={mae:.3f} RMSE={rmse:.3f} R^2={r2:.3f}")
---> 24 summarize("ROCKET+RidgeCV", y_test, pred_rocket)
     25 summarize("Ridge on lags", y_test, pred_ridge)
     26 print("Chosen alpha (ROCKET RidgeCV):", float(rocket.regressor_.alpha_))

Cell In[6], line 19, in summarize(name, y_true, y_pred)
     17 def summarize(name: str, y_true: np.ndarray, y_pred: np.ndarray) -> None:
     18     mae = mean_absolute_error(y_true, y_pred)
---> 19     rmse = mean_squared_error(y_true, y_pred, squared=False)
     20     r2 = r2_score(y_true, y_pred)
     21     print(f"{name}: MAE={mae:.3f} RMSE={rmse:.3f} R^2={r2:.3f}")

File ~/miniconda3/lib/python3.12/site-packages/sklearn/utils/_param_validation.py:194, in validate_params.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
    191 func_sig = signature(func)
    193 # Map *args/**kwargs to the function signature
--> 194 params = func_sig.bind(*args, **kwargs)
    195 params.apply_defaults()
    197 # ignore self/cls and positional/keyword markers

File ~/miniconda3/lib/python3.12/inspect.py:3277, in Signature.bind(self, *args, **kwargs)
   3272 def bind(self, /, *args, **kwargs):
   3273     """Get a BoundArguments object, that maps the passed `args`
   3274     and `kwargs` to the function's signature.  Raises `TypeError`
   3275     if the passed arguments can not be bound.
   3276     """
-> 3277     return self._bind(args, kwargs)

File ~/miniconda3/lib/python3.12/inspect.py:3266, in Signature._bind(self, args, kwargs, partial)
   3256         raise TypeError(
   3257             'got some positional-only arguments passed as '
   3258             'keyword arguments: {arg!r}'.format(
   (...)   3263             ),
   3264         )
   3265     else:
-> 3266         raise TypeError(
   3267             'got an unexpected keyword argument {arg!r}'.format(
   3268                 arg=next(iter(kwargs))))
   3270 return self._bound_arguments_cls(self, arguments)

TypeError: got an unexpected keyword argument 'squared'
fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_test, name="actual", line=dict(color="black")))
fig.add_trace(go.Scatter(x=t_test, y=pred_rocket, name="ROCKET+RidgeCV", line=dict(color="#59A14F")))
fig.add_trace(go.Scatter(x=t_test, y=pred_ridge, name="Ridge(lags)", line=dict(color="#E15759", dash="dot")))
fig.update_layout(title="One-step-ahead predictions", xaxis_title="date", yaxis_title="value")
fig.show()
def recursive_forecast(model, history: np.ndarray, steps: int, window_length: int) -> np.ndarray:
    history = np.asarray(history, dtype=float).tolist()
    preds = []
    for _ in range(int(steps)):
        x = np.asarray(history[-window_length:], dtype=float)[None, :]
        y_hat = float(model.predict(x)[0])
        preds.append(y_hat)
        history.append(y_hat)
    return np.asarray(preds)


h2 = 45
start = len(y) - h
hist = y.to_numpy()[:start]
truth = y.to_numpy()[start : start + h2]
t_h2 = y.index[start : start + h2]

rec = recursive_forecast(rocket, history=hist, steps=h2, window_length=L)

fig = go.Figure()
fig.add_trace(go.Scatter(x=y.index[start - 120 : start], y=y.to_numpy()[start - 120 : start], name="history", line=dict(color="rgba(0,0,0,0.35)")))
fig.add_trace(go.Scatter(x=t_h2, y=truth, name="truth", line=dict(color="black")))
fig.add_trace(go.Scatter(x=t_h2, y=rec, name="recursive forecast", line=dict(color="#59A14F")))
fig.update_layout(title="Recursive multi-step forecast (reduction)", xaxis_title="date", yaxis_title="value")
fig.show()
# Residual diagnostics (ROCKET, test horizon)
resid = y_test - pred_rocket
print("residual mean:", float(np.mean(resid)))
print("residual std:", float(np.std(resid, ddof=1)))
print("Jarque-Bera:", stats.jarque_bera(resid))

lags, acf_vals = _acf(resid, max_lag=30)
bound = 1.96 / np.sqrt(resid.size)

# QQ data
nq = resid.size
p = (np.arange(1, nq + 1) - 0.5) / nq
theoretical = stats.norm.ppf(p)
sample_q = np.sort((resid - resid.mean()) / resid.std(ddof=1))

fig = make_subplots(
    rows=2,
    cols=2,
    subplot_titles=("Residuals over time", "Residual histogram", "Residual ACF", "QQ plot (std residuals)"),
)

fig.add_trace(go.Scatter(x=t_test, y=resid, name="residuals", line=dict(color="#59A14F")), row=1, col=1)
fig.add_hline(y=0, line=dict(color="black", dash="dash"), row=1, col=1)

fig.add_trace(go.Histogram(x=resid, nbinsx=25, name="hist", marker_color="#59A14F"), row=1, col=2)

fig.add_trace(go.Bar(x=lags, y=acf_vals, name="ACF(resid)", marker_color="#59A14F"), row=2, col=1)
fig.add_trace(go.Scatter(x=[0, lags.max()], y=[bound, bound], mode="lines", line=dict(color="gray", dash="dash"), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(x=[0, lags.max()], y=[-bound, -bound], mode="lines", line=dict(color="gray", dash="dash"), showlegend=False), row=2, col=1)

fig.add_trace(go.Scatter(x=theoretical, y=sample_q, mode="markers", name="QQ", marker=dict(color="#59A14F")), row=2, col=2)
fig.add_trace(
    go.Scatter(x=[theoretical.min(), theoretical.max()], y=[theoretical.min(), theoretical.max()], mode="lines", line=dict(color="black", dash="dash"), showlegend=False),
    row=2,
    col=2,
)

fig.update_layout(height=750, title="Residual diagnostics (ROCKET)")
fig.show()
residual mean: 0.41093602627152837
residual std: 0.7574081863879396
Jarque-Bera: SignificanceResult(statistic=3.633108641106042, pvalue=0.1625850026815621)